package site.teamo.biu.flink.function.process;

import com.alibaba.fastjson2.JSON;
import com.alibaba.fastjson2.JSONObject;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;
import site.teamo.biu.flink.sdk.annotation.function.BFProcessFunction;
import site.teamo.biu.flink.sdk.function.AbstractProcessFunction;
import site.teamo.biu.flink.sdk.job.JobContext;

/**
 * @author haocongshun
 * @date 2023/07/27 14:44:32
 */
@BFProcessFunction(
        out = JSONObject.class
)
public class Convert2JSONFunction<IN> extends AbstractProcessFunction<IN, JSONObject> {

    public Convert2JSONFunction(String name, JobContext context) {
        super(name, context);
    }

    @Override
    public void processElement(IN in, ProcessFunction<IN, JSONObject>.Context context, Collector<JSONObject> collector) throws Exception {
        collector.collect(JSON.parseObject(JSON.toJSONString(in)));
    }
}
